RocketMQ Connect クイックスタート
クイックスタート
このチュートリアルでは、スタンドアロンモードでRocketMQコネクタのサンプルプロジェクト「rocketmq-connect-sample」を開始して、コネクタの原則を理解するのに役立てていただきます。サンプルプロジェクトには、ソースファイルからデータを読み取ってRocketMQクラスタに送信するソースコネクタが用意されています。また、RocketMQクラスタからメッセージを読み取ってターゲットファイルに書き込むシンクコネクタも提供しています。
1. 準備: RocketMQの開始
- Linux/Unix/Mac
- 64ビットJDK 1.8以上;
- Maven 3.2.x以上;
- RocketMQを開始します。 RocketMQ 4.x か RocketMQ 5.x 5.xバージョンを使用できます;
- ツールを使用してRocketMQメッセージの送受信をテストします。
ここでは、環境変数 NAMESRV_ADDR を使用して、ツールクライアントに RocketMQ の NameServer アドレスを localhost:9876 として知らせます。
#$ cd distribution/target/rocketmq-4.9.7/rocketmq-4.9.7
$ cd distribution/target/rocketmq-5.1.4/rocketmq-5.1.4
$ export NAMESRV_ADDR=localhost:9876
$ sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer
SendResult [sendStatus=SEND_OK, msgId= ...
$ sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer
ConsumeMessageThread_%d Receive New Messages: [MessageExt...
注意: RocketMQには、トピックとグループを自動的に作成する機能があります。メッセージを送信またはサブスクライブするとき、対応するトピックまたはグループが存在しない場合、RocketMQは自動的にそれらを作成します。したがって、事前にトピックとグループを作成する必要はありません。
2. コネクタランタイムの構築
git clone https://github.com/apache/rocketmq-connect.git
cd rocketmq-connect
export RMQ_CONNECT_HOME=`pwd`
mvn -Prelease-connect -Dmaven.test.skip=true clean install -U
注意: このプロジェクトにはすでにrocketmq-connect-sampleのコードがデフォルトで含まれているため、rocketmq-connect-sampleプラグインを別途構築する必要はありません。
3. スタンドアロンモードでコネクタワーカーを実行する
構成の変更
connect-standalone.conf
ファイルを編集して、RocketMQ接続アドレスとその他の情報を構成します。詳細は 9. 設定ファイルの指示 を参照してください。
cd $RMQ_CONNECT_HOME/distribution/target/rocketmq-connect-0.0.1-SNAPSHOT/rocketmq-connect-0.0.1-SNAPSHOT
vim conf/connect-standalone.conf
スタンドアロンモードでは、RocketMQ Connectは同期チェックポイント情報をローカルファイルディレクトリ storePathRootDir に永続化します。
storePathRootDir=/Users/YourUsername/rocketmqconnect/storeRoot
同期のチェックポイントをリセットする場合は、永続的なチェックポイントファイルを削除する必要があります。
rm -rf /Users/YourUsername/rocketmqconnect/storeRoot/*
スタンドアロンモードでコネクタワーカーを開始する
sh bin/connect-standalone.sh -c conf/connect-standalone.conf &
tips: 必要に応じて、docker/connect/bin/runconnect.sh
を変更してJVMのスタートアップパラメータを調整できます。
JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m"
スタートアップログファイルを参照するには
tail -100f ~/logs/rocketmqconnect/connect_runtime.log
ランタイムが正常に開始されると、ログファイルに次のように表示されます。
スタンドアロンワーカーの起動が成功しました。
tail -f
コマンドのログ追跡モードを終了するには、Ctrl + C
キーの組み合わせを押します。
4. ソースコネクタを開始する
ソースファイルを作成し、テストデータを書き込む
mkdir -p /Users/YourUsername/rocketmqconnect/
cd /Users/YourUsername/rocketmqconnect/
touch test-source-file.txt
echo "Hello \r\nRocketMQ\r\n Connect" >> test-source-file.txt
注意: 空行があってはいけません (空行があるとデモプログラムはエラーを起こします)。ソースコネクタはソースファイルを読み込み続け、各データ行をメッセージ本文に変換して、RocketMQに送信し、シンクコネクタが消費できるようにします。
ソースコネクタを開始する
curl -X POST -H "Content-Type: application/json" http://127.0.0.1:8082/connectors/fileSourceConnector -d '{
"connector.class": "org.apache.rocketmq.connect.file.FileSourceConnector",
"filename": "/Users/YourUsername/rocketmqconnect/test-source-file.txt",
"connect.topicname": "fileTopic"
}'
curlリクエストがステータス200を返すと、正常に作成されたことを示します。応答の例
{"status":200,"body":{"connector.class":"org.apache.rocketmq.connect.file.FileSourceConnector","filename":"/Users/YourUsername/rocketmqconnect/test-source-file.txt","connect.topicname":"fileTopic"}}
ログファイルを参照する
tail -100f ~/logs/rocketmqconnect/connect_runtime.log
以下のログが表示された場合は、ファイルソースコネクタが正常に開始されたことを意味します。
connectorファイルSourceConnectorが開始され、ターゲットの状態STARTEDに正常に設定されました!!
ソースコネクタの構成手順
キー | null可能 | デフォルト | 説明 |
---|---|---|---|
connector.class | false | Connectorインターフェースを実装するクラス名 (パッケージ名を含む) | |
filename | false | ソースファイルの名前 (絶対パスを使用することを推奨) | |
connect.topicname | false | ファイルデータを同期するために必要なトピック |
5. シンクコネクタを開始する
curl -X POST -H "Content-Type: application/json" http://127.0.0.1:8082/connectors/fileSinkConnector -d '{
"connector.class": "org.apache.rocketmq.connect.file.FileSinkConnector",
"filename": "/Users/YourUsername/rocketmqconnect/test-sink-file.txt",
"connect.topicnames": "fileTopic"
}'
curlリクエストがステータス200を返すと、正常に作成されたことを示します。応答の例
{"status":200,"body":{"connector.class":"org.apache.rocketmq.connect.file.FileSinkConnector","filename":"/Users/YourUsername/rocketmqconnect/test-sink-file.txt","connect.topicnames":"fileTopic"}}
ログファイルを参照する
tail -100f ~/logs/rocketmqconnect/connect_runtime.log
以下のログが表示された場合は、ファイルシンクコネクタが正常に開始されたことを意味します。
connectorファイルSinkConnectorが開始され、ターゲットの状態STARTEDに正常に設定されました!!
シンクコネクタが宛先ファイルにデータを書き込んだかどうかを確認する
cat /Users/YourUsername/rocketmqconnect/test-sink-file.txt
test-sink-file.txtファイルが生成され、その内容がtest-source-file.txtと同じ場合、プロセス全体が正常に実行されていることを意味します。
ソースファイルtest-source-file.txtにテストデータの書き込みを続ける
cd /Users/YourUsername/rocketmqconnect/
echo "Say Hi to\r\nRMQ Connector\r\nAgain" >> test-source-file.txt
# Wait a few seconds, check if rocketmq-connect replicate data to sink file succeed
sleep 10
cat /Users/YourUsername/rocketmqconnect/test-sink-file.txt
注意: rocketmq-connect-sample
はRocketMQトピックとの間でメッセージを送受信する際に通常メッセージ
を使用しているため、ファイルの内容の順序は異なる場合があります。これは順序付きメッセージ
とは異なり、通常メッセージ
を消費しても順序が保証されません。
シンクコネクタの構成手順
キー | null可能 | デフォルト | 説明 |
---|---|---|---|
connector.class | false | Connectorインターフェースを実装するクラス名 (パッケージ名を含む) | |
filename | false | シンクはデータをプルし、ファイルに保存します (絶対パスを使用することを推奨) | |
connect.topicnames | false | シンクが処理する必要があるデータメッセージのトピック |
ヒント: サンプルrocketmq-connect-sampleの構成ファイル手順は参考用であり、ソース/シンクコネクタによって構成が異なるため、特定のソース/シンクコネクタを参照してください。
6. コネクタの停止
コネクタを停止するための RESTful コマンド形式は、http://(ワーカー IP):(ポート)/connectors/(コネクタ名)/stop
です。
デモの 2 つのコネクタを停止するには、以下のコマンドを使用できます。
curl http://127.0.0.1:8082/connectors/fileSinkConnector/stop
curl http://127.0.0.1:8082/connectors/fileSourceConnector/stop
curl コマンドがステータス 200 を返した場合、コネクタが正常に停止されたことを示します。応答例
{"status":200,"body":"Connector[fileSinkConnector]deleted successfully"}
次のログメッセージが表示された場合、ファイルシンクコネクタが正常にシャットダウンされたことを意味します。
tail -100f ~/logs/rocketmqconnect/connect_default.log
connectorName:fileSinkConnector に対するシャットダウンが完了しました。
7. ワーカープロセスの停止
cd $RMQ_CONNECT_HOME/distribution/target/rocketmq-connect-0.0.1-SNAPSHOT/rocketmq-connect-0.0.1-SNAPSHOT
sh bin/connectshutdown.sh
8. ログディレクトリ
以下のコマンドを使用してログディレクトリを表示できます。
ls $HOME/logs/rocketmqconnect
ls ~/logs/rocketmqconnect
9. 設定ファイルの指示
使用方法に基づいて、RESTful ポート、storeRoot パス、ネームサーバーアドレス、およびその他の情報を変更します。
設定ファイルの例を次に示します。
#current cluster node uniquely identifies
workerId=DEFAULT_WORKER_1
# Http prot for user to access REST API
httpPort=8082
# Local file dir for config store
storePathRootDir=/Users/YourUsername/rocketmqconnect/storeRoot
#You need to modify it to your own rocketmq nameserver endpoint.
# RocketMQ namesrvAddr
namesrvAddr=127.0.0.1:9876
# Plugin path for loading Source/Sink Connectors
# The rocketmq-connect project already includes the rocketmq-connect-sample module by default, so no configuration is needed here.
pluginPaths=
storePathRootDir 設定の説明
スタンドアロンモードでは、RocketMQ Connect は同期チェックポイント情報を storePathRootDir で指定されたローカルファイルディレクトリに永続化します。永続的なファイルには次のものが含まれます。
キー | 説明 |
---|---|
connectorConfig.json | コネクタコンフィギュレーション永続化ファイル |
position.json | ソースコネクトデータ処理進捗永続化ファイル |
taskConfig.json | タスクコンフィギュレーション永続化ファイル |
offset.json | シンクコネクトデータ使用進捗永続化ファイル |
connectorStatus.json | コネクタステータス永続化ファイル |
taskStatus.json | タスクステータス永続化ファイル |